use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::{PeerAction, PeerId};
use logging::crit;
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use strum::IntoStaticStr;
use tracing::{debug, instrument, warn};
use types::{Epoch, EthSpec, Hash256, Slot};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
/// already requested slots. There is a timeout for each batch request. If this value is too high,
/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
/// case the responder will fill the response up to the max request size, assuming they have the
/// bandwidth to do so.
pub const EPOCHS_PER_BATCH: u64 = 1;

/// The maximum number of batches to queue before requesting more.
const BATCH_BUFFER_SIZE: u8 = 5;

/// A return type for functions that act on a `Chain` which informs the caller whether the chain
/// has been completed and should be removed or to be kept if further processing is
/// required.
///
/// Should be checked, since a failed chain must be removed. A chain that requested being removed
/// and continued is now in an inconsistent state.
pub type ProcessingResult = Result<KeepChain, RemoveChain>;

/// Reasons for removing a chain
#[derive(Debug)]
#[allow(dead_code)]
pub enum RemoveChain {
    EmptyPeerPool,
    ChainCompleted,
    /// A chain has failed. This boolean signals whether the chain should be blacklisted.
    ChainFailed {
        blacklist: bool,
        failing_batch: BatchId,
    },
    WrongBatchState(String),
    WrongChainState(String),
}

#[derive(Debug)]
pub struct KeepChain;

/// A chain identifier
pub type ChainId = Id;
pub type BatchId = Epoch;

#[derive(Debug, Copy, Clone, IntoStaticStr)]
pub enum SyncingChainType {
    Head,
    Finalized,
    Backfill,
}

/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
/// root are grouped into the peer pool and queried for batches when downloading the
/// chain.
#[derive(Debug)]
pub struct SyncingChain<T: BeaconChainTypes> {
    /// A random id used to identify this chain.
    id: ChainId,

    /// SyncingChain type
    pub chain_type: SyncingChainType,

    /// The start of the chain segment. Any epoch previous to this one has been validated.
    pub start_epoch: Epoch,

    /// The target head slot.
    pub target_head_slot: Slot,

    /// The target head root.
    pub target_head_root: Hash256,

    /// Sorted map of batches undergoing some kind of processing.
    batches: BTreeMap<BatchId, BatchInfo<T::EthSpec>>,

    /// The peers that agree on the `target_head_slot` and `target_head_root` as a canonical chain
    /// and thus available to download this chain from, as well as the batches we are currently
    /// requesting.
    peers: HashSet<PeerId>,

    /// Starting epoch of the next batch that needs to be downloaded.
    to_be_downloaded: BatchId,

    /// Starting epoch of the batch that needs to be processed next.
    /// This is incremented as the chain advances.
    processing_target: BatchId,

    /// Optimistic head to sync.
    /// If a block is imported for this batch, the chain advances to this point.
    optimistic_start: Option<BatchId>,

    /// When a batch for an optimistic start is tried (either successful or not), it is stored to
    /// avoid trying it again due to chain stopping/re-starting on chain switching.
    attempted_optimistic_starts: HashSet<BatchId>,

    /// The current state of the chain.
    pub state: ChainSyncingState,

    /// The current processing batch, if any.
    current_processing_batch: Option<BatchId>,
}

#[derive(PartialEq, Debug)]
pub enum ChainSyncingState {
    /// The chain is not being synced.
    Stopped,
    /// The chain is undergoing syncing.
    Syncing,
}

impl<T: BeaconChainTypes> SyncingChain<T> {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        id: Id,
        start_epoch: Epoch,
        target_head_slot: Slot,
        target_head_root: Hash256,
        peer_id: PeerId,
        chain_type: SyncingChainType,
    ) -> Self {
        SyncingChain {
            id,
            chain_type,
            start_epoch,
            target_head_slot,
            target_head_root,
            batches: BTreeMap::new(),
            peers: HashSet::from_iter([peer_id]),
            to_be_downloaded: start_epoch,
            processing_target: start_epoch,
            optimistic_start: None,
            attempted_optimistic_starts: HashSet::default(),
            state: ChainSyncingState::Stopped,
            current_processing_batch: None,
        }
    }

    /// Returns true if this chain has the same target
    pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool {
        self.target_head_slot == target_head_slot && self.target_head_root == target_head_root
    }

    /// Check if the chain has peers from which to process batches.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn available_peers(&self) -> usize {
        self.peers.len()
    }

    /// Get the chain's id.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn id(&self) -> ChainId {
        self.id
    }

    /// Peers currently syncing this chain.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn peers(&self) -> impl Iterator<Item = PeerId> + '_ {
        self.peers.iter().cloned()
    }

    /// Progress in epochs made by the chain
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn processed_epochs(&self) -> u64 {
        self.processing_target
            .saturating_sub(self.start_epoch)
            .into()
    }

    /// Returns the total count of pending blocks in all the batches of this chain
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn pending_blocks(&self) -> usize {
        self.batches
            .values()
            .map(|batch| batch.pending_blocks())
            .sum()
    }

    /// Removes a peer from the chain.
    /// If the peer has active batches, those are considered failed and re-requested.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
        self.peers.remove(peer_id);

        if self.peers.is_empty() {
            Err(RemoveChain::EmptyPeerPool)
        } else {
            Ok(KeepChain)
        }
    }

    /// Returns the latest slot number that has been processed.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn current_processed_slot(&self) -> Slot {
        // the last slot we processed was included in the previous batch, and corresponds to the
        // first slot of the current target epoch
        self.processing_target
            .start_slot(T::EthSpec::slots_per_epoch())
    }

    /// A block has been received for a batch on this chain.
    /// If the block correctly completes the batch it will be processed if possible.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn on_block_response(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
        peer_id: &PeerId,
        request_id: Id,
        blocks: Vec<RpcBlock<T::EthSpec>>,
    ) -> ProcessingResult {
        // check if we have this batch
        let batch = match self.batches.get_mut(&batch_id) {
            None => {
                debug!(epoch = %batch_id, "Received a block for unknown batch");
                // A batch might get removed when the chain advances, so this is non fatal.
                return Ok(KeepChain);
            }
            Some(batch) => {
                // A batch could be retried without the peer failing the request (disconnecting/
                // sending an error /timeout) if the peer is removed from the chain for other
                // reasons. Check that this block belongs to the expected peer, and that the
                // request_id matches
                // TODO(das): removed peer_id matching as the node may request a different peer for data
                // columns.
                if !batch.is_expecting_block(&request_id) {
                    return Ok(KeepChain);
                }
                batch
            }
        };

        // A stream termination has been sent. This batch has ended. Process a completed batch.
        // Remove the request from the peer's active batches

        // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258
        let received = batch.download_completed(blocks, *peer_id)?;
        let awaiting_batches = batch_id
            .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
            / EPOCHS_PER_BATCH;
        debug!(epoch = %batch_id, blocks = received, batch_state = self.visualize_batch_state(), %awaiting_batches,"Batch downloaded");

        // pre-emptively request more blocks from peers whilst we process current blocks,
        self.request_batches(network)?;
        self.process_completed_batches(network)
    }

    /// Processes the batch with the given id.
    /// The batch must exist and be ready for processing
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn process_batch(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
    ) -> ProcessingResult {
        // Only process batches if this chain is Syncing, and only one at a time
        if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() {
            return Ok(KeepChain);
        }

        let Some(beacon_processor) = network.beacon_processor_if_enabled() else {
            return Ok(KeepChain);
        };

        let Some(batch) = self.batches.get_mut(&batch_id) else {
            return Err(RemoveChain::WrongChainState(format!(
                "Trying to process a batch that does not exist: {}",
                batch_id
            )));
        };

        // NOTE: We send empty batches to the processor in order to trigger the block processor
        // result callback. This is done, because an empty batch could end a chain and the logic
        // for removing chains and checking completion is in the callback.

        let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
        metrics::observe_duration(
            &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
            duration_in_awaiting_processing,
        );

        let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
        self.current_processing_batch = Some(batch_id);

        if let Err(e) = beacon_processor.send_chain_segment(process_id, blocks) {
            crit!(msg = "process_batch",error = %e, batch = ?self.processing_target, "Failed to send chain segment to processor.");
            // This is unlikely to happen but it would stall syncing since the batch now has no
            // blocks to continue, and the chain is expecting a processing result that won't
            // arrive.  To mitigate this, (fake) fail this processing so that the batch is
            // re-downloaded.
            self.on_batch_process_result(network, batch_id, &BatchProcessResult::NonFaultyFailure)
        } else {
            Ok(KeepChain)
        }
    }

    /// Processes the next ready batch, prioritizing optimistic batches over the processing target.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn process_completed_batches(
        &mut self,
        network: &mut SyncNetworkContext<T>,
    ) -> ProcessingResult {
        // Only process batches if this chain is Syncing and only process one batch at a time
        if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() {
            return Ok(KeepChain);
        }

        // Find the id of the batch we are going to process.
        //
        // First try our optimistic start, if any. If this batch is ready, we process it. If the
        // batch has not already been completed, check the current chain target.
        if let Some(epoch) = self.optimistic_start {
            if let Some(batch) = self.batches.get(&epoch) {
                let state = batch.state();
                match state {
                    BatchState::AwaitingProcessing(..) => {
                        // this batch is ready
                        debug!(%epoch, "Processing optimistic start");
                        return self.process_batch(network, epoch);
                    }
                    BatchState::Downloading(..) => {
                        // The optimistic batch is being downloaded. We wait for this before
                        // attempting to process other batches.
                        return Ok(KeepChain);
                    }
                    BatchState::Poisoned => unreachable!("Poisoned batch"),
                    BatchState::Processing(_)
                    | BatchState::AwaitingDownload
                    | BatchState::Failed => {
                        // these are all inconsistent states:
                        // - Processing -> `self.current_processing_batch` is None
                        // - Failed -> non recoverable batch. For an optimistic batch, it should
                        //   have been removed
                        // - AwaitingDownload -> A recoverable failed batch should have been
                        //   re-requested.
                        return Err(RemoveChain::WrongChainState(format!(
                            "Optimistic batch indicates inconsistent chain state: {:?}",
                            state
                        )));
                    }
                    BatchState::AwaitingValidation(_) => {
                        // If an optimistic start is given to the chain after the corresponding
                        // batch has been requested and processed we can land here. We drop the
                        // optimistic candidate since we can't conclude whether the batch included
                        // blocks or not at this point
                        debug!(batch = %epoch, "Dropping optimistic candidate");
                        self.optimistic_start = None;
                    }
                }
            }
        }

        // if the optimistic target can't be processed, check the processing target
        if let Some(batch) = self.batches.get(&self.processing_target) {
            let state = batch.state();
            match state {
                BatchState::AwaitingProcessing(..) => {
                    return self.process_batch(network, self.processing_target);
                }
                BatchState::Downloading(..) => {
                    // Batch is not ready, nothing to process
                }
                BatchState::Poisoned => unreachable!("Poisoned batch"),
                BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
                    // these are all inconsistent states:
                    // - Failed -> non recoverable batch. Chain should have beee removed
                    // - AwaitingDownload -> A recoverable failed batch should have been
                    //   re-requested.
                    // - Processing -> `self.current_processing_batch` is None
                    return Err(RemoveChain::WrongChainState(format!(
                        "Robust target batch indicates inconsistent chain state: {:?}",
                        state
                    )));
                }
                BatchState::AwaitingValidation(_) => {
                    // we can land here if an empty optimistic batch succeeds processing and is
                    // inside the download buffer (between `self.processing_target` and
                    // `self.to_be_downloaded`). In this case, eventually the chain advances to the
                    // batch (`self.processing_target` reaches this point).
                    debug!(
                        batch = %self.processing_target,
                        "Chain encountered a robust batch awaiting validation"
                    );

                    self.processing_target += EPOCHS_PER_BATCH;
                    if self.to_be_downloaded <= self.processing_target {
                        self.to_be_downloaded = self.processing_target + EPOCHS_PER_BATCH;
                    }
                    self.request_batches(network)?;
                }
            }
        } else if !self.good_peers_on_sampling_subnets(self.processing_target, network) {
            // This is to handle the case where no batch was sent for the current processing
            // target when there is no sampling peers available. This is a valid state and should not
            // return an error.
            return Ok(KeepChain);
        } else {
            return Err(RemoveChain::WrongChainState(format!(
                "Batch not found for current processing target {}",
                self.processing_target
            )));
        }
        Ok(KeepChain)
    }

    /// The block processor has completed processing a batch. This function handles the result
    /// of the batch processor.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn on_batch_process_result(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
        result: &BatchProcessResult,
    ) -> ProcessingResult {
        // the first two cases are possible if the chain advances while waiting for a processing
        // result
        let batch_state = self.visualize_batch_state();
        let batch = match &self.current_processing_batch {
            Some(processing_id) if *processing_id != batch_id => {
                debug!(batch_epoch = %batch_id, expected_batch_epoch = %processing_id,"Unexpected batch result");
                return Ok(KeepChain);
            }
            None => {
                debug!(batch_epoch = %batch_id,"Chain was not expecting a batch result");
                return Ok(KeepChain);
            }
            _ => {
                // batch_id matches, continue
                self.current_processing_batch = None;
                self.batches.get_mut(&batch_id).ok_or_else(|| {
                    RemoveChain::WrongChainState(format!(
                        "Current processing batch not found: {}",
                        batch_id
                    ))
                })?
            }
        };

        let peer = batch.processing_peer().cloned().ok_or_else(|| {
            RemoveChain::WrongBatchState(format!(
                "Processing target is in wrong state: {:?}",
                batch.state(),
            ))
        })?;

        // Log the process result and the batch for debugging purposes.
        debug!(
            result = ?result,
            batch_epoch = %batch_id,
            client = %network.client_type(&peer),
            batch_state = ?batch_state,
            ?batch,
            "Batch processing result"
        );

        // We consider three cases. Batch was successfully processed, Batch failed processing due
        // to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
        match result {
            BatchProcessResult::Success {
                sent_blocks,
                imported_blocks,
            } => {
                if sent_blocks > imported_blocks {
                    let ignored_blocks = sent_blocks - imported_blocks;
                    metrics::inc_counter_vec_by(
                        &metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
                        &[self.chain_type.into()],
                        ignored_blocks as u64,
                    );
                }
                metrics::inc_counter_vec(
                    &metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
                    &[self.chain_type.into()],
                );

                batch.processing_completed(BatchProcessingResult::Success)?;

                // was not empty = sent_blocks > 0
                if *sent_blocks > 0 {
                    // If the processed batch was not empty, we can validate previous unvalidated
                    // blocks.
                    self.advance_chain(network, batch_id);
                    // we register so that on chain switching we don't try it again
                    self.attempted_optimistic_starts.insert(batch_id);
                } else if self.optimistic_start == Some(batch_id) {
                    // check if this batch corresponds to an optimistic batch. In this case, we
                    // reject it as an optimistic candidate since the batch was empty
                    self.reject_optimistic_batch(
                        network,
                        false, /* do not re-request */
                        "batch was empty",
                    )?;
                }

                if batch_id == self.processing_target {
                    self.processing_target += EPOCHS_PER_BATCH;
                }

                // check if the chain has completed syncing
                if self.current_processed_slot() >= self.target_head_slot {
                    // chain is completed
                    Err(RemoveChain::ChainCompleted)
                } else {
                    // chain is not completed
                    // attempt to request more batches
                    self.request_batches(network)?;
                    // attempt to process more batches
                    self.process_completed_batches(network)
                }
            }
            BatchProcessResult::FaultyFailure {
                imported_blocks,
                penalty,
            } => {
                // Penalize the peer appropiately.
                network.report_peer(peer, *penalty, "faulty_batch");

                // Check if this batch is allowed to continue
                match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
                    BatchOperationOutcome::Continue => {
                        // Chain can continue. Check if it can be moved forward.
                        if *imported_blocks > 0 {
                            // At least one block was successfully verified and imported, so we can be sure all
                            // previous batches are valid and we only need to download the current failed
                            // batch.
                            self.advance_chain(network, batch_id);
                        }
                        // Handle this invalid batch, that is within the re-process retries limit.
                        self.handle_invalid_batch(network, batch_id)
                    }
                    BatchOperationOutcome::Failed { blacklist } => {
                        // Check that we have not exceeded the re-process retry counter,
                        // If a batch has exceeded the invalid batch lookup attempts limit, it means
                        // that it is likely all peers in this chain are are sending invalid batches
                        // repeatedly and are either malicious or faulty. We drop the chain and
                        // report all peers.
                        // There are some edge cases with forks that could land us in this situation.
                        // This should be unlikely, so we tolerate these errors, but not often.
                        warn!(
                            score_adjustment = %penalty,
                            batch_epoch = %batch_id,
                            "Batch failed to download. Dropping chain scoring peers"
                        );

                        for peer in self.peers.drain() {
                            network.report_peer(peer, *penalty, "faulty_chain");
                        }
                        Err(RemoveChain::ChainFailed {
                            blacklist,
                            failing_batch: batch_id,
                        })
                    }
                }
            }
            BatchProcessResult::NonFaultyFailure => {
                batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;
                // Simply redownload the batch.
                self.send_batch(network, batch_id)
            }
        }
    }

    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn reject_optimistic_batch(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        redownload: bool,
        reason: &str,
    ) -> ProcessingResult {
        if let Some(epoch) = self.optimistic_start.take() {
            self.attempted_optimistic_starts.insert(epoch);
            // if this batch is inside the current processing range, keep it, otherwise drop
            // it. NOTE: this is done to prevent non-sequential batches coming from optimistic
            // starts from filling up the buffer size
            if epoch < self.to_be_downloaded {
                debug!(%epoch, reason, "Rejected optimistic batch left for future use");
                // this batch is now treated as any other batch, and re-requested for future use
                if redownload {
                    return self.send_batch(network, epoch);
                }
            } else {
                debug!(%epoch, reason, "Rejected optimistic batch");
                self.batches.remove(&epoch);
            }
        }

        Ok(KeepChain)
    }

    /// Removes any batches previous to the given `validating_epoch` and updates the current
    /// boundaries of the chain.
    ///
    /// The `validating_epoch` must align with batch boundaries.
    ///
    /// If a previous batch has been validated and it had been re-processed, penalize the original
    /// peer.
    #[allow(clippy::modulo_one)]
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn advance_chain(&mut self, network: &mut SyncNetworkContext<T>, validating_epoch: Epoch) {
        // make sure this epoch produces an advancement
        if validating_epoch <= self.start_epoch {
            return;
        }

        // safety check for batch boundaries
        if validating_epoch % EPOCHS_PER_BATCH != self.start_epoch % EPOCHS_PER_BATCH {
            crit!("Validating Epoch is not aligned");
            return;
        }

        // batches in the range [BatchId, ..) (not yet validated)
        let remaining_batches = self.batches.split_off(&validating_epoch);
        // batches less than `validating_epoch`
        let removed_batches = std::mem::replace(&mut self.batches, remaining_batches);

        for (id, batch) in removed_batches.into_iter() {
            // only for batches awaiting validation can we be sure the last attempt is
            // right, and thus, that any different attempt is wrong
            match batch.state() {
                BatchState::AwaitingValidation(ref processed_attempt) => {
                    for attempt in batch.attempts() {
                        // The validated batch has been re-processed
                        if attempt.hash != processed_attempt.hash {
                            // The re-downloaded version was different
                            if processed_attempt.peer_id != attempt.peer_id {
                                // A different peer sent the correct batch, the previous peer did not
                                // We negatively score the original peer.
                                let action = PeerAction::LowToleranceError;
                                debug!(
                                    batch_epoch = %id, score_adjustment = %action,
                                    original_peer = %attempt.peer_id, new_peer = %processed_attempt.peer_id,
                                    "Re-processed batch validated. Scoring original peer"
                                );
                                network.report_peer(
                                    attempt.peer_id,
                                    action,
                                    "batch_reprocessed_original_peer",
                                );
                            } else {
                                // The same peer corrected it's previous mistake. There was an error, so we
                                // negative score the original peer.
                                let action = PeerAction::MidToleranceError;
                                debug!(
                                    batch_epoch = %id,
                                    score_adjustment = %action,
                                    original_peer = %attempt.peer_id,
                                    new_peer = %processed_attempt.peer_id,
                                    "Re-processed batch validated by the same peer"
                                );
                                network.report_peer(
                                    attempt.peer_id,
                                    action,
                                    "batch_reprocessed_same_peer",
                                );
                            }
                        }
                    }
                }
                BatchState::Downloading(..) => {}
                BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
                    crit!("batch indicates inconsistent chain state while advancing chain")
                }
                BatchState::AwaitingProcessing(..) => {}
                BatchState::Processing(_) => {
                    debug!(batch = %id, %batch, "Advancing chain while processing a batch");
                    if let Some(processing_id) = self.current_processing_batch {
                        if id <= processing_id {
                            self.current_processing_batch = None;
                        }
                    }
                }
            }
        }

        self.processing_target = self.processing_target.max(validating_epoch);
        let old_start = self.start_epoch;
        self.start_epoch = validating_epoch;
        self.to_be_downloaded = self.to_be_downloaded.max(validating_epoch);
        if self.batches.contains_key(&self.to_be_downloaded) {
            // if a chain is advanced by Range beyond the previous `self.to_be_downloaded`, we
            // won't have this batch, so we need to request it.
            self.to_be_downloaded += EPOCHS_PER_BATCH;
        }
        if let Some(epoch) = self.optimistic_start {
            if epoch <= validating_epoch {
                self.optimistic_start = None;
            }
        }
        debug!(
            previous_start = %old_start,
            new_start = %self.start_epoch,
            processing_target = %self.processing_target,
            "Chain advanced"
        );
    }

    /// An invalid batch has been received that could not be processed, but that can be retried.
    ///
    /// These events occur when a peer has successfully responded with blocks, but the blocks we
    /// have received are incorrect or invalid. This indicates the peer has not performed as
    /// intended and can result in downvoting a peer.
    #[instrument(parent = None,level = "info", fields(service = self.id, network), skip_all)]
    fn handle_invalid_batch(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
    ) -> ProcessingResult {
        // The current batch could not be processed, indicating either the current or previous
        // batches are invalid.

        // The previous batch could be incomplete due to the block sizes being too large to fit in
        // a single RPC request or there could be consecutive empty batches which are not supposed
        // to be there

        // The current (sub-optimal) strategy is to simply re-request all batches that could
        // potentially be faulty. If a batch returns a different result than the original and
        // results in successful processing, we downvote the original peer that sent us the batch.

        if let Some(epoch) = self.optimistic_start {
            // If this batch is an optimistic batch, we reject this epoch as an optimistic
            // candidate and try to re download it
            if epoch == batch_id {
                return self.reject_optimistic_batch(network, true, "batch was invalid");
                // since this is the optimistic batch, we can't consider previous batches as
                // invalid.
            }
        }
        // this is our robust `processing_target`. All previous batches must be awaiting
        // validation
        let mut redownload_queue = Vec::new();

        for (id, batch) in self.batches.range_mut(..batch_id) {
            if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? {
                // remove the chain early
                return Err(RemoveChain::ChainFailed {
                    blacklist,
                    failing_batch: *id,
                });
            }
            redownload_queue.push(*id);
        }

        // no batch maxed out it process attempts, so now the chain's volatile progress must be
        // reset
        self.processing_target = self.start_epoch;

        for id in redownload_queue {
            self.send_batch(network, id)?;
        }
        // finally, re-request the failed batch.
        self.send_batch(network, batch_id)
    }

    pub fn stop_syncing(&mut self) {
        self.state = ChainSyncingState::Stopped;
    }

    /// Either a new chain, or an old one with a peer list
    /// This chain has been requested to start syncing.
    ///
    /// This could be new chain, or an old chain that is being resumed.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn start_syncing(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        local_finalized_epoch: Epoch,
        optimistic_start_epoch: Epoch,
    ) -> ProcessingResult {
        // to avoid dropping local progress, we advance the chain wrt its batch boundaries. This
        let align = |epoch| {
            // start_epoch + (number of batches in between)*length_of_batch
            self.start_epoch + ((epoch - self.start_epoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH
        };
        // get the *aligned* epoch that produces a batch containing the `local_finalized_epoch`
        let validating_epoch = align(local_finalized_epoch);
        // align the optimistic_start too.
        let optimistic_epoch = align(optimistic_start_epoch);

        // advance the chain to the new validating epoch
        self.advance_chain(network, validating_epoch);
        if self.optimistic_start.is_none()
            && optimistic_epoch > self.processing_target
            && !self.attempted_optimistic_starts.contains(&optimistic_epoch)
        {
            self.optimistic_start = Some(optimistic_epoch);
        }

        // update the state
        self.state = ChainSyncingState::Syncing;

        // begin requesting blocks from the peer pool, until all peers are exhausted.
        self.request_batches(network)?;

        // start processing batches if needed
        self.process_completed_batches(network)
    }

    /// Add a peer to the chain.
    ///
    /// If the chain is active, this starts requesting batches from this peer.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn add_peer(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        peer_id: PeerId,
    ) -> ProcessingResult {
        self.peers.insert(peer_id);
        self.request_batches(network)
    }

    /// An RPC error has occurred.
    ///
    /// If the batch exists it is re-requested.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn inject_error(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
        peer_id: &PeerId,
        request_id: Id,
        err: RpcResponseError,
    ) -> ProcessingResult {
        let batch_state = self.visualize_batch_state();
        if let Some(batch) = self.batches.get_mut(&batch_id) {
            // A batch could be retried without the peer failing the request (disconnecting/
            // sending an error /timeout) if the peer is removed from the chain for other
            // reasons. Check that this block belongs to the expected peer
            // TODO(das): removed peer_id matching as the node may request a different peer for data
            // columns.
            if !batch.is_expecting_block(&request_id) {
                debug!(
                    batch_epoch = %batch_id,
                    batch_state = ?batch.state(),
                    %peer_id,
                    %request_id,
                    ?batch_state,
                    "Batch not expecting block"
                );
                return Ok(KeepChain);
            }
            debug!(
                batch_epoch = %batch_id,
                batch_state = ?batch.state(),
                error = ?err,
                %peer_id,
                %request_id,
                "Batch download error"
            );
            if let BatchOperationOutcome::Failed { blacklist } =
                batch.download_failed(Some(*peer_id))?
            {
                return Err(RemoveChain::ChainFailed {
                    blacklist,
                    failing_batch: batch_id,
                });
            }
            self.send_batch(network, batch_id)
        } else {
            debug!(
                batch_epoch = %batch_id,
                %peer_id,
                %request_id,
                batch_state,
                "Batch not found"
            );
            // this could be an error for an old batch, removed when the chain advances
            Ok(KeepChain)
        }
    }

    /// Requests the batch assigned to the given id from a given peer.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn send_batch(
        &mut self,
        network: &mut SyncNetworkContext<T>,
        batch_id: BatchId,
    ) -> ProcessingResult {
        let batch_state = self.visualize_batch_state();
        if let Some(batch) = self.batches.get_mut(&batch_id) {
            let (request, batch_type) = batch.to_blocks_by_range_request();
            let failed_peers = batch.failed_peers();

            // TODO(das): we should request only from peers that are part of this SyncingChain.
            // However, then we hit the NoPeer error frequently which causes the batch to fail and
            // the SyncingChain to be dropped. We need to handle this case more gracefully.
            let synced_peers = network
                .network_globals()
                .peers
                .read()
                .synced_peers()
                .cloned()
                .collect::<HashSet<_>>();

            match network.block_components_by_range_request(
                batch_type,
                request,
                RangeRequestId::RangeSync {
                    chain_id: self.id,
                    batch_id,
                },
                &synced_peers,
                &failed_peers,
            ) {
                Ok(request_id) => {
                    // inform the batch about the new request
                    batch.start_downloading(request_id)?;
                    if self
                        .optimistic_start
                        .map(|epoch| epoch == batch_id)
                        .unwrap_or(false)
                    {
                        debug!(epoch = %batch_id, %batch, %batch_state, "Requesting optimistic batch");
                    } else {
                        debug!(epoch = %batch_id, %batch, %batch_state, "Requesting batch");
                    }
                    return Ok(KeepChain);
                }
                Err(e) => match e {
                    // TODO(das): Handle the NoPeer case explicitly and don't drop the batch. For
                    // sync to work properly it must be okay to have "stalled" batches in
                    // AwaitingDownload state. Currently it will error with invalid state if
                    // that happens. Sync manager must periodicatlly prune stalled batches like
                    // we do for lookup sync. Then we can deprecate the redundant
                    // `good_peers_on_sampling_subnets` checks.
                    e
                    @ (RpcRequestSendError::NoPeer(_) | RpcRequestSendError::InternalError(_)) => {
                        // NOTE: under normal conditions this shouldn't happen but we handle it anyway
                        warn!(%batch_id, error = ?e, "batch_id" = %batch_id, %batch, "Could not send batch request");
                        // register the failed download and check if the batch can be retried
                        batch.start_downloading(1)?; // fake request_id = 1 is not relevant
                        match batch.download_failed(None)? {
                            BatchOperationOutcome::Failed { blacklist } => {
                                return Err(RemoveChain::ChainFailed {
                                    blacklist,
                                    failing_batch: batch_id,
                                })
                            }
                            BatchOperationOutcome::Continue => {
                                return self.send_batch(network, batch_id)
                            }
                        }
                    }
                },
            }
        }

        Ok(KeepChain)
    }

    /// Returns true if this chain is currently syncing.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn is_syncing(&self) -> bool {
        match self.state {
            ChainSyncingState::Syncing => true,
            ChainSyncingState::Stopped => false,
        }
    }

    /// Kickstarts the chain by sending for processing batches that are ready and requesting more
    /// batches if needed.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    pub fn resume(
        &mut self,
        network: &mut SyncNetworkContext<T>,
    ) -> Result<KeepChain, RemoveChain> {
        // Request more batches if needed.
        self.request_batches(network)?;
        // If there is any batch ready for processing, send it.
        self.process_completed_batches(network)
    }

    /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
    /// pool and left over batches until the batch buffer is reached or all peers are exhausted.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn request_batches(&mut self, network: &mut SyncNetworkContext<T>) -> ProcessingResult {
        if !matches!(self.state, ChainSyncingState::Syncing) {
            return Ok(KeepChain);
        }

        // find the next pending batch and request it from the peer

        // check if we have the batch for our optimistic start. If not, request it first.
        // We wait for this batch before requesting any other batches.
        if let Some(epoch) = self.optimistic_start {
            if !self.good_peers_on_sampling_subnets(epoch, network) {
                debug!("Waiting for peers to be available on sampling column subnets");
                return Ok(KeepChain);
            }

            if let Entry::Vacant(entry) = self.batches.entry(epoch) {
                let batch_type = network.batch_type(epoch);
                let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
                entry.insert(optimistic_batch);
                self.send_batch(network, epoch)?;
            }
            return Ok(KeepChain);
        }

        // find the next pending batch and request it from the peer
        // Note: for this function to not infinite loop we must:
        // - If `include_next_batch` returns Some we MUST increase the count of batches that are
        //   accounted in the `BACKFILL_BATCH_BUFFER_SIZE` limit in the `matches!` statement of
        //   that function.
        while let Some(batch_id) = self.include_next_batch(network) {
            // send the batch
            self.send_batch(network, batch_id)?;
        }

        // No more batches, simply stop
        Ok(KeepChain)
    }

    /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
    /// every sampling column subnet.
    fn good_peers_on_sampling_subnets(
        &self,
        epoch: Epoch,
        network: &SyncNetworkContext<T>,
    ) -> bool {
        if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
            // Require peers on all sampling column subnets before sending batches
            let peers_on_all_custody_subnets = network
                .network_globals()
                .sampling_subnets
                .iter()
                .all(|subnet_id| {
                    let peer_count = network
                        .network_globals()
                        .peers
                        .read()
                        .good_custody_subnet_peer(*subnet_id)
                        .count();

                    peer_count > 0
                });
            peers_on_all_custody_subnets
        } else {
            true
        }
    }

    /// Creates the next required batch from the chain. If there are no more batches required,
    /// `false` is returned.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
        // don't request batches beyond the target head slot
        if self
            .to_be_downloaded
            .start_slot(T::EthSpec::slots_per_epoch())
            >= self.target_head_slot
        {
            return None;
        }

        // only request batches up to the buffer size limit
        // NOTE: we don't count batches in the AwaitingValidation state, to prevent stalling sync
        // if the current processing window is contained in a long range of skip slots.
        let in_buffer = |batch: &BatchInfo<T::EthSpec>| {
            matches!(
                batch.state(),
                BatchState::Downloading(..) | BatchState::AwaitingProcessing(..)
            )
        };
        if self
            .batches
            .iter()
            .filter(|&(_epoch, batch)| in_buffer(batch))
            .count()
            > BATCH_BUFFER_SIZE as usize
        {
            return None;
        }

        // don't send batch requests until we have peers on sampling subnets
        // TODO(das): this is a workaround to avoid sending out excessive block requests because
        // block and data column requests are currently coupled. This can be removed once we find a
        // way to decouple the requests and do retries individually, see issue #6258.
        if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
            debug!("Waiting for peers to be available on custody column subnets");
            return None;
        }

        // If no batch needs a retry, attempt to send the batch of the next epoch to download
        let next_batch_id = self.to_be_downloaded;
        // this batch could have been included already being an optimistic batch
        match self.batches.entry(next_batch_id) {
            Entry::Occupied(_) => {
                // this batch doesn't need downloading, let this same function decide the next batch
                self.to_be_downloaded += EPOCHS_PER_BATCH;
                self.include_next_batch(network)
            }
            Entry::Vacant(entry) => {
                let batch_type = network.batch_type(next_batch_id);
                entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type));
                self.to_be_downloaded += EPOCHS_PER_BATCH;
                Some(next_batch_id)
            }
        }
    }

    /// Creates a string visualization of the current state of the chain, to make it easier for debugging and understanding
    /// where sync is up to from glancing at the logs.
    ///
    /// This produces a string of the form: [D,E,E,E,E]
    /// to indicate the current buffer state of the chain. The symbols are defined on each of the
    /// batch states. See [BatchState::visualize] for symbol definitions.
    #[instrument(parent = None,level = "info", fields(chain = self.id , service = "range_sync"), skip_all)]
    fn visualize_batch_state(&self) -> String {
        let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize);

        // Start of the block
        visualization_string.push('[');

        for mut batch_index in 0..BATCH_BUFFER_SIZE {
            if let Some(batch) = self
                .batches
                .get(&(self.processing_target + batch_index as u64 * EPOCHS_PER_BATCH))
            {
                visualization_string.push(batch.visualize());
                if batch_index != BATCH_BUFFER_SIZE {
                    // Add a comma in between elements
                    visualization_string.push(',');
                }
            } else {
                // No batch exists, it is on our list to be downloaded
                // Fill in the rest of the gaps
                while batch_index < BATCH_BUFFER_SIZE {
                    visualization_string.push('E');
                    // Add a comma between the empty batches
                    if batch_index < BATCH_BUFFER_SIZE.saturating_sub(1) {
                        visualization_string.push(',')
                    }
                    batch_index += 1;
                }
                break;
            }
        }
        visualization_string.push(']');
        visualization_string
    }
}

use super::batch::WrongState as WrongBatchState;
impl From<WrongBatchState> for RemoveChain {
    fn from(err: WrongBatchState) -> Self {
        RemoveChain::WrongBatchState(err.0)
    }
}

impl RemoveChain {
    pub fn is_critical(&self) -> bool {
        matches!(
            self,
            RemoveChain::WrongBatchState(..) | RemoveChain::WrongChainState(..)
        )
    }
}

impl From<RangeSyncType> for SyncingChainType {
    fn from(value: RangeSyncType) -> Self {
        match value {
            RangeSyncType::Head => Self::Head,
            RangeSyncType::Finalized => Self::Finalized,
        }
    }
}
